iT邦幫忙

2022 iThome 鐵人賽

DAY 9
2
Software Development

軟體架構師的自我修養系列 第 9

[Day 9] 訊息佇列的各種考量

  • 分享至 

  • xImage
  •  

昨天我們演示了CQRS的系統演化,將一個單體架構一步步演化成一個事件驅動架構。在事件驅動架構中,有一個靈魂角色是訊息佇列,扮演訊息存儲和傳遞的角色。

但有這麼多訊息佇列系統和實踐方法,又該如何在其中進行選擇?在這篇文章中,我們會仔細介紹訊息佇列的各個考量點,並且以Redis作為示範來解釋每個考量點的影響。

此外,Redis雖然廣泛被作為快取使用,但實際上以Redis為核心的訊息佇列實踐也不少見,因此,這篇文章也能作為一個快速入門。

訊息佇列

在選擇一個訊息佇列時有許多面向要考慮:

  1. 傳播方式
  2. 抵達方式
  3. 持久化
  4. 水平擴展

在這節我會解釋每個面向。

傳播方式

這裡的傳播方式指的是一個訊息要怎麼送到目的地,有兩種傳播型態:

  • 一對一
  • 一對多(扇出)

一對一很容易理解,當訊息生產者發送一個訊息進入訊息佇列,這個訊息只會傳送給一個消費者收到。

一對多則是當生產者送出一個訊息後,該訊息會被多個消費者收到。值得一提的是,生產者只送出一個訊息,但是這個訊息會被訊息佇列複製給多個消費者。這樣的行為也稱為扇出。

抵達方式

抵達方式很有趣,多數的訊息佇列都有自己的抵達保證。有三種常見的保證:

  • 至多一次(at-most-once)
  • 至少一次(at-least-once)
  • 僅有一次(exactly-once)

至多一次相對容易達成。基本上只要是訊息佇列,至少都有這個保證。

訊息消費者可能收到一次訊息,或根本沒收到。沒收到的情況有幾種,首先,訊息因為網路傳輸的問題遺失了,或者,雖然消費者有收到,但在處理的過程中出現錯誤,導致訊息沒正確完成。

在至多一次保證下,訊息遺失就再也找不回來了。

至少一次通常在一些主流的訊息佇列上很常見,例如RabbitMQ或Kafka等。相較於至多一次,至少一次是一個較強的保證。訊息佇列可以保證每個訊息都有被處理。

儘管如此,訊息也有可能被處理很多次。舉例來說,一個消費者在處理完後並沒有知會訊息佇列,因此訊息佇列又將訊息遞送一次。在至少一次的保證下,保持訊息處理的冪等性是非常重要的。

僅有一次是最嚴格的保證。在僅有一次保證下,所有的訊息只會被處理一次。即使是熱門的訊息佇列也不一定有支持這樣的保證,例如RabbiMQ就沒有。但若是正確設定和使用Kafka,是可以做到僅有一次的,只是會犧牲掉效能。

持久化

持久化指的是當訊息送進佇列後會不會消失。這也有三種持久化型態:

  • 存在記憶體
  • 存在硬碟
  • 雙軌制

我們都知道這些代表什麼意思。

但有趣的是,使用硬碟持久化會比較慢嗎?答案是不一定。

這取決於持久化是怎麼實作的,Kafka利用LSM-tree和零複製的技術做到即便存硬碟也不會損失效能,甚至效能比存記憶體的RabbitMQ更好。

另一個存硬碟也不損失效能的例子是Cassandra資料庫,他同樣利用LSM-tree的方式提升效能,甚至讓Cassandra是一個很擅長寫入的資料庫。

雙軌制是一個特殊模式,為了提升效能,訊息佇列首先將訊息寫入記憶體,並定期寫入硬碟以保障持久化。RabbitMQ就是個雙軌制典型的例子,值得一提的是,RabbitMQ也可以設定成只存硬碟。

水平擴展

我認為,水平擴展是一個訊息佇列最重要的考量。

處理訊息通常很花時間,因此我們必須要啟用更多消費者來處理訊息,也就是水平擴展。

要支援水平擴展有個很重要的特性稱為消費者群組。消費者群組讓處理訊息的消費者不再是一個,而可以是多個,同時,消費者群組也可以套用在一對多的情況。使一個訊息送至多個消費者群組,而不是一個。

Redis訊息佇列

討論完訊息佇列的一些特性,讓我們來看看Redis如何成為一個訊息佇列,並且檢視每個訊息佇列特性的意涵。

Redis要作為訊息佇列有三種做法:

  • Pub/Sub
  • 列表(List)
  • 串流(Stream)

我們會一個一個介紹,最後提供一個總結。

Pub/Sub

Pub/Sub是一個常見的通知方案,這個功能幾乎跟Redis一起誕生。消費者透過SUBSCRIBE訂閱一個主題,在生產者PUBLISH訊息進主題後就會收到。

身為一個傳統Pub/Sub的功能,這當然也支援扇出的機制。更有甚者,也可以透過PSUBSCRIBE做到一定程度的訊息路由。

但是,Pub/Sub並不是Redis的熱門功能。最大的問題是,訊息只有至多一次保證。當訊息送出,但消費者沒正確接到或成功處理,這個訊息就會消失。

更糟糕的是,Redis並沒有持久化訊息。所以當Redis故障,所有的訊息就直接消失。

讓我們總結一下Pub/Sub:

  • 一對一和一對多都可以支援
  • 至多一次保證
  • 沒有持久化
  • 沒有水平擴展

列表

列表在Redis中是一個有用的資料結構,透過列表也可以簡單做到先進先出佇列(FIFO)。秘訣是使用BLPOP等待訊息進入列表,但建議加個超時限制。

根據上圖,我們可以看到如果有多個消費者在等待同一個列表,那麼他們就會自動變成一個消費者群組而不需要額外設定。

另一方面,列表無法把訊息扇出。一但一個訊息被消費者接走,其他人就再也收不到了,即便這個訊息沒被成功處理。

儘管如此,列表是能夠在記憶體中持久化訊息的,此外,如果有開啟AOFRDB那麼訊息可以備份在硬碟上。不過我必須要說,Redis並不是完全持久化。

總結一下:

  • 一對一可以,但一對多不行
  • 至多一次保證
  • 持久化在記憶體,備份在硬碟
  • 可以水平擴展

串流

介紹完Pub/Sub和串流,我們注意到這兩個都不怎麼好,這兩個方案都有自己的缺點。因此,串流作為解決方案在Redis 5.0版本之後被提出。

因為串流複雜多了,因此我們先總結一下串流的特性:

  • 一對一和一對多都可以支援
  • 「至少一次」保證
  • 持久化在記憶體,備份在硬碟
  • 可以水平擴展

就結果來說,串流解決Pub/Sub和列表所有碰到的問題,並且額外提供至少一次保證。

這個圖形長相像是Pub/Sub但工作流程卻更接近列表。

生產者可以在任何時間透過XADD將訊息送入Redis,可以想像成是有一個列表把所有訊息保存下來。消費者也可以在任何時間透過XREAD將訊息取出,在XREAD後面用到的運算元指的是要從哪開始拿訊息。

  • $: 無論之前有多少訊息,我都只從現在開始拿。
  • 0-0: 總是從頭開始拿。
  • <id>: 從一個特定的訊息開始拿。

除了單一消費者外,串流也支援消費者群組。

為了做到至少一次保證,就像多數訊息佇列一樣,消費者必須在成功處理訊息後利用XACK知會Redis

<是一個特殊的運算元,指的是從群組內還沒有人拿過的訊息開始。

解釋完整個流程,我用一個實際的Nodejs範例示範消費者的正確行為。

let lastid = "0-0";
let checkBacklog = true
while (true) {
    const myid = checkBacklog ? lastid : ">";
    const items = await redis.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',StreamName,myid);

    if (!items) continue;
    
    checkBacklog = !(items[0][1].length == 0);
    
    items[0][1].forEach(elem => {
        const [id, fields] = elem;
        await processMessage(id,fields);
        await redis.xack(StreamName,GroupName,id);
        lastid = id;
    });
}

請注意,所有的消費者都有他自己的名字ConsumerName

首先,消費者先從最前頭讀取,用來判斷自己最後的位置。如果回應是空陣列表示消費者可以從lastid知道該從哪開始。最後知會Redis結束的id

消費者故障轉移

在分散式系統中,我們很難命名一個消費者。舉例來說,消費者可能在k8s中執行,那我們該如何命名每個pod?就算我們可以在一開始鎖定名字,我們該怎麼面對水平擴展後產生的新消費者?因此,在分散式系統中命名是不實際的。

就算如此,我們也不能用uuid來命名消費者,然後用過就丟。因為Redis會維護一個名字和最後位置的對應表。如果每次都建立一個隨機名字,那麼那張對應表就會越來越大,最糟的是,那些被取出卻沒知會的訊息再也不可能被處理。

好在,Redis提供一個方法讓人認領待處理的訊息。工作流程如下:

  1. 找出所有待處理的訊息。
  2. 認領那些訊息,並轉移所有權。

因此,整個消費者的啟動流程如下:

  1. XPENDING StreamName GroupName
  2. XCLAIM StreamName GroupName <ConsumerName in uuid> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
  3. 接續上面的啟動流程

min-idle-time是一個有用的機制,我們可以避免很多消費者同時認領一個訊息。當第一個消費者開始認領訊息時,這個訊息就不再是閒置了,因此其餘消費者也無法重複認領。

透過正確使用XCLAIM,我們就可以使用隨機名字也不用擔心對應表會不斷長大,因為訊息會被別人認領走,而對應表中沒用的紀錄會被刪除。

串流持久化

當串流內保存越來越多訊息,記憶體用量會變成Redis的災難。

如果我們去看Redis的串流手冊,會發現有一個XDEL可以用。但XDEL實際上並不會真的刪除訊息,而是將訊息標註成不可用,但訊息還是在那。

那我們該如何避免記憶體洩漏?

我們可以在XADD時使用MAXLEN這個參數:

XADD StreamName MAXLEN 1000 * foo bar

但有件事得要知道,MAXLEN會影響Redis的效能,他會卡住Redis的主執行緒直到清除完畢,此時其他Redis命令都無法執行。如果有許多訊息不斷流入,那Redis會很忙於維持MAXLEN

有個替代方案,與其固定長度,不如讓Redis自己決定該要留多長,並在Redis的空閑時間處理。因此,命令會變成:

XADD StreamName MAXLEN ~ 1000 * foo bar

這個~符號表示最大長度約略是1000,但可以是900也可以是1300,Redis會自己選擇適合的長度並在適合的時間處理。

結論

讓我們總結一下這三個方案。

Pub/Sub 列表 串流
扇出 V V
遞送保證 At-most-once At-most-once At-least-once
水平擴展 V V
持久化 AOF, RDB AOF, RDB
複雜度 low low high

有一個剛沒提到的特性,複雜度,這指的不只是技術的複雜度,同時也是實作生產者的複雜度。

就我觀點來說,這每個方案都有自己的優缺點,也有自己適合的場景。

  • Pub/Sub:盡力而為(best-effort)的通知。
  • 列表:能容忍一些訊息遺失的佇列。
  • 串流:寬鬆的串流處理。

值得一提的是,為什麼Redis串流是鬆散的串流處理?

因為Redis的消費者群組不如Kafka,他無法保證訊息的順序。若是無法保證訊息順序,那在一個高流量的環境,也會很難成功做到水平擴展。


上一篇
[Day 8] 從單體演化成CQRS實戰
下一篇
[Day 10] 事件驅動架構的設計模式(上)
系列文
軟體架構師的自我修養31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言